﻿// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Diagnostics.CodeAnalysis;
using Dapr.Common;
using Dapr.Jobs.Models;
using Dapr.Jobs.Models.Responses;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

namespace Dapr.Jobs;

/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
[Experimental("DAPR_JOBS", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/jobs/jobs-overview/")]
internal sealed class DaprJobsGrpcClient(Autogenerated.Dapr.DaprClient client, HttpClient httpClient, string? daprApiToken = null) : DaprJobsClient(client, httpClient, daprApiToken: daprApiToken)
{
    /// <summary>
    /// Schedules a job with Dapr.
    /// </summary>
    /// <param name="jobName">The name of the job being scheduled.</param>
    /// <param name="schedule">The schedule defining when the job will be triggered.</param>
    /// <param name="payload">The main payload of the job.</param>
    /// <param name="startingFrom">The optional point-in-time from which the job schedule should start.</param>
    /// <param name="repeats">The optional number of times the job should be triggered.</param>
    /// <param name="ttl">Represents when the job should expire. If both this and DueTime are set, TTL needs to represent a later point in time.</param>
    /// <param name="overwrite">A flag indicating whether the job should be overwritten when submitted (true); otherwise false to require that an existing job with the same name be deleted first.</param>
    /// <param name="failurePolicyOptions">The characteristics of the policy to apply when a job fails to trigger.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule schedule,
        ReadOnlyMemory<byte>? payload = null, DateTimeOffset? startingFrom = null, int? repeats = null,
        DateTimeOffset? ttl = null, bool overwrite = false, IJobFailurePolicyOptions? failurePolicyOptions = null,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(jobName, nameof(jobName));
        ArgumentNullException.ThrowIfNull(schedule, nameof(schedule));

        var job = new Autogenerated.Job { Name = jobName };

        //Set up the schedule (recurring or point in time)
        if (schedule.IsPointInTimeExpression)
        {
            job.DueTime = schedule.ExpressionValue;
        }
        else if (schedule.IsCronExpression || schedule.IsPrefixedPeriodExpression || schedule.IsDurationExpression)
        {
            job.Schedule = schedule.ExpressionValue;
        }
        
        if (startingFrom is not null)
        {
            job.DueTime = ((DateTimeOffset)startingFrom).ToString("O");
        }
        
        if (repeats is not null)
        {
            if (repeats < 0)
            {
                throw new ArgumentOutOfRangeException(nameof(repeats));
            }

            job.Repeats = (uint)repeats;
        }

        if (payload is not null)
        {
            job.Data = new Any { Value = ByteString.CopyFrom(payload.Value.Span), TypeUrl = "dapr.io/schedule/jobpayload" };
        }

        if (ttl is not null)
        {
            if (ttl <= startingFrom)
            {
                throw new ArgumentException(
                    $"When both {nameof(ttl)} and {nameof(startingFrom)} are specified, {nameof(ttl)} must represent a later point in time");
            }

            job.Ttl = ((DateTimeOffset)ttl).ToString("O");
        }

        if (failurePolicyOptions is not null)
        {
            switch (failurePolicyOptions.Policy)
            {
                case JobFailurePolicy.Drop:
                    job.FailurePolicy = new Autogenerated.JobFailurePolicy
                    {
                        Drop = new Autogenerated.JobFailurePolicyDrop()
                    };
                    break;
                case JobFailurePolicy.Constant:
                    var constantOptions = (JobFailurePolicyConstantOptions)failurePolicyOptions;
                    if (constantOptions.MaxRetries is null)
                    {
                        job.FailurePolicy = new Autogenerated.JobFailurePolicy
                        {
                            Constant = new Autogenerated.JobFailurePolicyConstant
                            {
                                Interval = constantOptions.Interval.ToDuration()
                            }
                        };
                    }
                    else
                    {
                        job.FailurePolicy = new Autogenerated.JobFailurePolicy
                        {
                            Constant = new Autogenerated.JobFailurePolicyConstant
                            {
                                Interval = constantOptions.Interval.ToDuration(),
                                MaxRetries = (uint)constantOptions.MaxRetries
                            }
                        };
                    }
                    break;
            }
        }

        var envelope = new Autogenerated.ScheduleJobRequest { Job = job, Overwrite = overwrite };

        var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprJobsClient).Assembly, this.DaprApiToken, cancellationToken);

        try
        {
            await Client.ScheduleJobAlpha1Async(envelope, grpcCallOptions).ConfigureAwait(false);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            //Ignore our own cancellation
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled && cancellationToken.IsCancellationRequested)
        {
            // Ignore a remote cancellation due to our own cancellation
        }
        catch (Exception ex)
        {
            throw new DaprException(
                "Schedule job operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }
    }

    /// <summary>
    /// Retrieves the details of a registered job.
    /// </summary>
    /// <param name="jobName">The name of the job.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>The details comprising the job.</returns>
    public override async Task<DaprJobDetails> GetJobAsync(string jobName, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrWhiteSpace(jobName))
        {
            throw new ArgumentNullException(nameof(jobName));
        }

        try
        {
            var envelope = new Autogenerated.GetJobRequest { Name = jobName };
            var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprJobsClient).Assembly, this.DaprApiToken, cancellationToken);
            var response = await Client.GetJobAlpha1Async(envelope, grpcCallOptions);
            return DeserializeJobResponse(response);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            //Ignore our own cancellation
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled && cancellationToken.IsCancellationRequested)
        {
            // Ignore a remote cancellation due to our own cancellation
        }
        catch (Exception ex)
        {
            throw new DaprException(
                "Get job operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

        throw new DaprException("Get job operation failed: the Dapr endpoint did not return the expected value.");
    }

    /// <summary>
    /// Testable method for performing job response deserialization.
    /// </summary>
    /// <remarks>
    /// This is exposed strictly for testing purposes.
    /// </remarks>
    /// <param name="response">The job response to deserialize.</param>
    /// <returns>The deserialized job response.</returns>
    internal static DaprJobDetails DeserializeJobResponse(Autogenerated.GetJobResponse response)
    {
        var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime)
            ? DaprJobSchedule.FromDateTime(dueTime)
            : new DaprJobSchedule(response.Job.Schedule);
                
        return new DaprJobDetails(schedule)
        {
            DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null,
            Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null,
            RepeatCount = (int?)response.Job.Repeats ?? 0,
            Payload = response.Job.Data?.ToByteArray() ?? null
        };
    }

    /// <summary>
    /// Deletes the specified job.
    /// </summary>
    /// <param name="jobName">The name of the job.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns></returns>
    public override async Task DeleteJobAsync(string jobName, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrWhiteSpace(jobName))
        {
            throw new ArgumentNullException(nameof(jobName));
        }

        try
        {
            var envelope = new Autogenerated.DeleteJobRequest { Name = jobName };
            var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprJobsClient).Assembly, this.DaprApiToken, cancellationToken);
            await Client.DeleteJobAlpha1Async(envelope, grpcCallOptions);
        }
        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            //Ignore our own cancellation
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled && cancellationToken.IsCancellationRequested)
        {
            // Ignore a remote cancellation due to our own cancellation
        }
        catch (Exception ex)
        {
            throw new DaprException(
                "Delete job operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            this.HttpClient.Dispose();
        }
    }
}
